 1.大数据高速计算引擎Spark(上)之Spark SQL中Spark SQL原理下SQL解析过程
   
   Spark SQL 可以说是 Spark 中的精华部分。原来基于 RDD 构建大数据计算任务，重
心在向 DataSet 转移，原来基于 RDD 写的代码也在迁移。使用 Spark SQL 编码好处
是非常大的，尤其是在性能方面，有很大提升。Spark SQL 中各种内嵌的性能优化比
写 RDD 遵守各种最佳实践更靠谱的，尤其对新手来说。如先 filter 操作再 map 操
作，Spark SQL 中会自动进行谓词下推；Spark SQL中会自动使用 broadcast join 来
广播小表，把 shuffle join 转化为 map join 等等。
   Spark SQL对SQL语句的处理和关系型数据库类似，即词法/语法解析、绑定、优化、
执行。Spark SQL会先将SQL语句解析成一棵树，然后使用规则(Rule)对Tree进行绑
定、优化等处理过程。Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部
分构成：
       Core: 负责处理数据的输入和输出，如获取数据，查询结果输出成DataFrame等
       Catalyst: 负责处理整个查询过程，包括解析、绑定、优化等
       Hive: 负责对Hive数据进行处理
       Hive-ThriftServer: 主要用于对Hive的访问
   Spark SQL的代码复杂度是问题的本质复杂度带来的，Spark SQL中的 Catalyst 框架
大部分逻辑是在一个 Tree 类型的数据结构上做各种折腾，基于 Scala 来实现还是很优
雅的，Scala 的偏函数和强大的 Case 正则匹配，让整个代码看起来非常优雅。
   
   SparkSession 是编写 Spark 应用代码的入口，启动一个 spark-shell 会提供给你一
个创建 SparkSession， 这个对象是整个 Spark 应用的起始点。以下是SparkSession的 
一些重要的变量和方法：
   
   类               功能
   catalog          通过对这个类可以操作元数据,对数据库、表、函数进
                    行增删改查,内部使用SessionCatalog完成具体操作
   table            把一个table或view包装为一个DataFrame进行后续操作
   emptyDataset/    创建空的Dataset 或 DataFrame
   emptyDataFrame
   sql              执行sql，返回一个DataFrame
   read 或          获取数据读取器，读取各种格式的数据
   readStream     
   sessionState     维护了当前session使用的所有状态数据；
                    还包括SQL解析器、分析器、优化器等

package cn.lagou.sparksql
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.{DataFrame, SparkSession}
object Plan {
	def main(args: Array[String]): Unit = {
  val spark = SparkSession
  .builder()
  .appName("Demo1")
  .master("local[*]")
  .getOrCreate()
  spark.sparkContext.setLogLevel("warn")
  import spark.implicits._
  Seq((0, "zhansan", 10),
  (1, "lisi", 11),
  (2, "wangwu", 12)).toDF("id", "name",
"age").createOrReplaceTempView("stu")
  Seq((0, "chinese", 80), (0, "math", 100), (0, "english", 98),
  (1, "chinese", 86), (1, "math", 97), (1, "english", 90),
  (2, "chinese", 90), (2, "math", 94), (2, "english", 88)
 ).toDF("id", "subject",
"score").createOrReplaceTempView("score")
  val df: DataFrame = spark.sql(
   """
    |select sum(v), name
    | from (select stu.id, 100 + 10 + score.score as v,
name
    |     from stu join score
    |     where stu.id = score.id and stu.age >= 11)
tmp
    |group by name
    |""".stripMargin)
  df.show()
  // 打印执行计划
  println(df.queryExecution)
  println(df.queryExecution.optimizedPlan)
  spark.close()
}
}
   queryExecution 就是整个执行计划的执行引擎，里面有执行过程中各个中间过程变
量，整个执行流程如下：
   上面例子中的 SQL 语句经过 Parser 解析后就会变成一个抽象语法树，对应解析后的
逻辑计划 AST 为：

== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias `tmp`
   +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#26, 'name]
      +- 'Filter (('stu.id = 'score.id) && ('stu.age >= 11))
         +- 'Join Inner
            :- 'UnresolvedRelation `stu`
            +- 'UnresolvedRelation `score`
   
   备注：在执行计划中 Project/Projection 代表的意思是投影
   选投连三种最基本的操作
   其中过滤条件变为了 Filter 节点，这个节点是 UnaryNode(一元节点) 类型， 只有一
个孩子。两个表中的数据变为了 UnresolvedRelation节点，节点类型为LeafNode,即叶子
节点， JOIN 操作为节点， 这个是一个BinaryNode节点,有两个孩子。
   以上节点都是 LogicalPlan 类型的， 可以理解为进行各种操作的 Operator，
SparkSQL 对各种操作定义了各种 Operator。
   
   这些 operator 组成的抽象语法树就是整个 Catatyst 优化的基础，Catatyst 优化器会
在这个树上面进行各种折腾，把树上面的节点挪来挪去来进行优化。
   经过 Parser 有了抽象语法树，但是并不知道 score，sum 这些东西是啥，所以就需
要 analyer 来定位。
   analyzer 会把 AST 上所有 Unresolved 的东西都转变为 resolved 状态，SparkSQL
有很多resolve 规则：
        ResolverRelations。解析表（列）的基本类型等信息
        ResolveFuncions。解析出来函数的基本信息
        ResolveReferences。解析引用，通常是解析列名

== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS sum(v)#28L, name#8]
+- SubqueryAlias `tmp`
   +- Project [id#7, ((100 + 10) + score#22) AS v#26, name#8]
      +- Filter ((id#7 = id#20) && (age#9 >= 11))
         +- Join Inner
            :- SubqueryAlias `stu`
            :  +- Project [_1#3 AS id#7, _2#4 AS name#8, _3#5 AS age#9]
            :     +- LocalRelation [_1#3, _2#4, _3#5]
            +- SubqueryAlias `score`
               +- Project [_1#16 AS id#20, _2#17 AS subject#21, _3#18 AS score#22]
                  +- LocalRelation [_1#16, _2#17, _3#18]
   
   下面要进行逻辑优化了，常见的逻辑优化有：

== Optimized Logical Plan ==
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS sum(v)#28L, name#8]
+- Project [(110 + score#22) AS v#26, name#8]
   +- Join Inner, (id#7 = id#20)
      :- LocalRelation [id#7, name#8]
      +- LocalRelation [id#20, score#22]
    
   这里用到的优化有：谓词下推(Push Down Predicate)、常量折叠(Constant
Folding)、字段裁剪(Columning Pruning)
   做完逻辑优化，还需要先转换为物理执行计划，将逻辑上可行的执行计划变为 Spark
可以真正执行的计划：
   SparkSQL 把逻辑节点转换为了相应的物理节点， 比如 Join 算子，Spark 根据不同
场景为该算子制定了不同的算法策略。

== Physical Plan ==
*(2) HashAggregate(keys=[name#8], functions=[sum(cast(v#26 as bigint))], output=[sum(v)#28L, name#8])
+- Exchange hashpartitioning(name#8, 200)
   +- *(1) HashAggregate(keys=[name#8], functions=[partial_sum(cast(v#26 as bigint))], output=[name#8, sum#38L])
      +- *(1) Project [(110 + score#22) AS v#26, name#8]
         +- *(1) BroadcastHashJoin [id#7], [id#20], Inner, BuildLeft
            :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
            :  +- LocalTableScan [id#7, name#8]
            +- LocalTableScan [id#20, score#22]
   
   数据在一个一个的 plan 中流转，然后每个 plan 里面表达式都会对数据进行处理，
就相当于经过了一个个小函数的调用处理，这里面有大量的函数调用开销。是不是可以
把这些小函数内联一下，当成一个大函数，WholeStageCodegen 就是干这事的。可以
看到最终执行计划每个节点前面有个 * 号，说明整段代码生成被启用，Project、
BroadcastHashJoin、HashAggregate 这一段都启用了整段代码生成，级联为了大函数。

   
   